package com.yc.bigdata.flink.demo;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;

/**
 * <p></p>
 *
 * @author: YuanChilde
 * @date: 2020-02-11 11:46
 * @version: 1.0
 * Modification History:
 * Date    Author      Version     Description
 * -----------------------------------------------------------------
 * 2020-02-11 11:46    YuanChilde     1.0        新增
 */
public class SinkToMySQL extends RichSinkFunction<String> {
    PreparedStatement ps;
/*    BasicDataSource dataSource;*/
    private Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
     /*   dataSource = new BasicDataSource();
        connection = getConnection(dataSource);*/
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
        System.out.println("open=====================");
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (ps != null) {
            ps.close();
        }
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }
        System.out.println("close=====================");
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
      /*  for (Student student : value) {
            ps.setInt(1, student.getId());
            ps.setString(2, student.getName());
            ps.setString(3, student.getPassword());
            ps.setInt(4, student.getAge());
            ps.addBatch();
        }
        int[] count = ps.executeBatch();//批量后执行*/
    }

/*    private Connection getConnection(BasicDataSource dataSource) {
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        //注意，替换成自己本地的 mysql 数据库地址和用户名、密码
        dataSource.setUrl("jdbc:mysql://localhost:3306/test");
        dataSource.setUsername("root");
        dataSource.setPassword("root123456");
        //设置连接池的一些参数
        dataSource.setInitialSize(10);
        dataSource.setMaxTotal(50);
        dataSource.setMinIdle(2);

        Connection con = null;
        try {
            con = dataSource.getConnection();
            System.out.println("创建连接池：" + con);
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }*/
}
